Skip to content

feat(cache): add in flight deduping#4459

Open
MasterPtato wants to merge 1 commit into03-18-fix_cache_clean_up_libfrom
03-19-feat_cache_add_in_flight_deduping
Open

feat(cache): add in flight deduping#4459
MasterPtato wants to merge 1 commit into03-18-fix_cache_clean_up_libfrom
03-19-feat_cache_add_in_flight_deduping

Conversation

@MasterPtato
Copy link
Contributor

Description

Please include a summary of the changes and the related issue. Please also include relevant motivation and context.

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Please describe the tests that you ran to verify your changes.

Checklist:

  • My code follows the style guidelines of this project
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes

@railway-app
Copy link

railway-app bot commented Mar 19, 2026

🚅 Deployed to the rivet-pr-4459 environment in rivet-frontend

Service Status Web Updated (UTC)
frontend-cloud 😴 Sleeping (View Logs) Web Mar 24, 2026 at 7:53 am
frontend-inspector 😴 Sleeping (View Logs) Web Mar 24, 2026 at 1:03 am
website ❌ Build Failed (View Logs) Web Mar 19, 2026 at 8:53 pm
mcp-hub ✅ Success (View Logs) Web Mar 19, 2026 at 8:52 pm
ladle ❌ Build Failed (View Logs) Web Mar 19, 2026 at 8:51 pm

Copy link
Contributor Author

MasterPtato commented Mar 19, 2026

@MasterPtato MasterPtato force-pushed the 03-18-fix_cache_clean_up_lib branch from 175706a to 17d21f5 Compare March 19, 2026 20:53
@MasterPtato MasterPtato force-pushed the 03-19-feat_cache_add_in_flight_deduping branch from 75e1e38 to bc5d6d3 Compare March 19, 2026 20:53
@pkg-pr-new
Copy link

pkg-pr-new bot commented Mar 19, 2026

More templates

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@4459

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@4459

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@4459

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@4459

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@4459

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@4459

@rivetkit/sqlite-vfs

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sqlite-vfs@4459

@rivetkit/traces

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/traces@4459

@rivetkit/workflow-engine

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/workflow-engine@4459

@rivetkit/virtual-websocket

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/virtual-websocket@4459

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@4459

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@4459

commit: 6570bf2

@claude
Copy link

claude bot commented Mar 19, 2026

Code Review: feat(cache): add in-flight deduplication

The overall approach is solid — using scc::HashMap for lock-free concurrent tracking of in-flight requests and tokio::sync::broadcast for fan-out notification is well-suited to this problem. The test suite is comprehensive and the GetterCtx refactor to HashMap<K, GetterCtxEntry<V>> is a clear improvement.

A few issues worth addressing before merge:


Critical: Broadcast not sent when getter resolves no values

In req_config.rs, the broadcast is only sent when entries_values is non-empty:

if !entries_values.is_empty() {
    if let Err(err) = cache.driver.set(&base_key_clone, entries_values).await {
        tracing::error!(?err, "failed to write to cache");
    }
    let _ = broadcast_tx.send(());
}

// Release leases
for key in leased_keys {
    self.cache.in_flight.remove_async(&cache_key).await;
}

If the getter returns no values for the leased keys (e.g. the entities don't exist), waiting subscribers never receive the signal and block for the full IN_FLIGHT_TIMEOUT (5 seconds) before falling back to their own getter. The broadcast should be sent unconditionally before the lease cleanup:

if !entries_values.is_empty() {
    if let Err(err) = cache.driver.set(&base_key_clone, entries_values).await {
        tracing::error!(?err, "failed to write to cache");
    }
}
// Always signal waiters whether or not values were written
let _ = broadcast_tx.send(());

for key in leased_keys { ... }

Critical: Lease not released on getter error

If tokio::try_join! returns an error (e.g. the leased getter fails), the function returns early via ? and the lease cleanup code is never reached:

let (ctx2, ctx3) = tokio::try_join!(...)?;  // early return skips cleanup

// never reached on error:
for key in leased_keys {
    self.cache.in_flight.remove_async(&cache_key).await;
}

Stale entries remain in in_flight indefinitely (scc::HashMap has no TTL). All future requests for those keys will subscribe to the dead channel, wait 5 seconds, then fall back — permanently degrading performance for those keys until process restart.

A scopeguard-style defer or restructuring cleanup so it runs regardless of the join result would fix this.


Minor: No cleanup on future cancellation

Related to the above: if the enclosing async task is dropped while fetch_all_convert is in progress (e.g. an HTTP request is cancelled), the in-flight entries are never removed. Cleanup should be cancellation-safe, ideally via a guard that runs on drop.


Nit: buffer_unordered(1024) magic number

Since all these futures are just awaiting a broadcast signal, a named constant or brief comment explaining the intent would improve clarity.


Nit: Broadcast channel buffer size

let (broadcast_tx, _) = broadcast::channel(16);

Only one message is ever sent, so the buffer size doesn't affect correctness. Lagged subscribers correctly fall through to the getter via .ok().flatten().is_some() returning false. A brief comment noting this would help readers.


Required changes:

  1. Send the broadcast unconditionally, outside the if !entries_values.is_empty() block
  2. Ensure lease cleanup runs even when try_join! errors and on future cancellation

The rest of the implementation looks correct. The merge() approach is safe since leased_keys and waiting_keys are always disjoint. The waiting path correctly uses resolve_from_cache() so re-fetched values aren't redundantly written back to cache.

@MasterPtato MasterPtato force-pushed the 03-19-feat_cache_add_in_flight_deduping branch from bc5d6d3 to b395fae Compare March 21, 2026 01:55
@MasterPtato MasterPtato force-pushed the 03-18-fix_cache_clean_up_lib branch from 17d21f5 to de18421 Compare March 21, 2026 01:55
@MasterPtato MasterPtato force-pushed the 03-18-fix_cache_clean_up_lib branch from de18421 to e65f84d Compare March 24, 2026 00:30
@MasterPtato MasterPtato force-pushed the 03-19-feat_cache_add_in_flight_deduping branch from b395fae to 6570bf2 Compare March 24, 2026 00:30
@MasterPtato MasterPtato mentioned this pull request Mar 24, 2026
11 tasks
@MasterPtato MasterPtato force-pushed the 03-19-feat_cache_add_in_flight_deduping branch from 6570bf2 to b46226c Compare March 24, 2026 00:36
@MasterPtato MasterPtato force-pushed the 03-18-fix_cache_clean_up_lib branch from e65f84d to 65200cf Compare March 24, 2026 00:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant